Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce dedicated event publisher per document #1052

Merged
merged 5 commits into from
Nov 1, 2024
Merged

Conversation

hackerwins
Copy link
Member

@hackerwins hackerwins commented Oct 27, 2024

What this PR does / why we need it:

Introduce dedicated event publisher per document

Before:

  • PushPullChanges and WatchDocument routines directly published events
  • Event publishing was not controllable per document
  • Network issues could block event publishing routines

This change introduces a dedicated event publisher per document.

  • Centralize event publishing control
  • Prevent publishing routines from being blocked
  • Future improvements: Event throttling, Duplicate event prevention

Which issue(s) this PR fixes:

Fixes #

Special notes for your reviewer:

Does this PR introduce a user-facing change?:


Additional documentation:


Checklist:

  • Added relevant tests or not required
  • Didn't break anything

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced real-time synchronization for document attachments, enhancing responsiveness and consistency in client-server interactions.
    • Added a new BatchPublisher for efficient event publishing in a concurrent environment.
  • Bug Fixes

    • Improved error handling and resource management in the subscription and publishing processes.
  • Tests

    • Expanded test coverage for real-time synchronization across various scenarios, ensuring robust handling of document updates and client presence.

These updates improve the overall performance and reliability of document management within the application.

Copy link

coderabbitai bot commented Oct 27, 2024

Walkthrough

The changes in this pull request primarily focus on enhancing the document synchronization capabilities within the client-server architecture. The Attach method in the Client struct now utilizes a real-time synchronization option, replacing the previous manual synchronization approach. This is reflected in the modifications to the AttachOptions structure and the introduction of a BatchPublisher for managing event publishing in batches. Various tests have been updated to incorporate the new real-time synchronization feature, ensuring that document changes are monitored and propagated effectively.

Changes

File Change Summary
client/client.go Modified Attach method to initiate the watch loop based on opts.IsRealtime instead of opts.IsManual.
client/options.go Replaced IsManual with IsRealtime in AttachOptions; removed WithManualSync and added WithRealtimeSync functions.
server/backend/sync/memory/publisher.go Introduced BatchPublisher struct for batching and publishing document events; added methods for event processing and publishing.
server/backend/sync/memory/pubsub.go Added publisher field to Subscriptions struct; modified Publish method to utilize batching; added Close method for cleanup.
server/backend/sync/pubsub.go Added publishTimeout constant to improve code readability; updated Publish method to use this constant.
test/bench/grpc_bench_test.go Updated BenchmarkRPC function to use client.WithRealtimeSync() for document attachments.
test/integration/admin_test.go Enhanced TestAdmin to include client.WithRealtimeSync() for document attachment, affecting event propagation.
test/integration/auth_webhook_test.go Modified TestProjectAuthWebhook to support real-time synchronization in document attachment; expanded test cases for authorization logic.
test/integration/document_test.go Updated multiple test cases to include client.WithRealtimeSync() for real-time synchronization during document attachment.
test/integration/presence_test.go Enhanced TestPresence to include real-time synchronization; added new test scenarios for client presence updates.
test/integration/server_test.go Updated TestServer to include client.WithRealtimeSync() in document attachment.

Possibly related PRs

  • Enhance GetDocuments API by adding bulk retrieval #931: This PR enhances the GetDocuments API by adding bulk retrieval, which is related to the changes in the main PR that modify how document changes are monitored and synchronized, particularly with the introduction of real-time synchronization options.
  • Implement InitialRoot option for Document attachment #986: This PR introduces an InitialRoot option for document attachment, which modifies the Attach method in a way that could interact with the changes made in the main PR regarding document synchronization and attachment logic.
  • Detach documents when client is deactivated #1036: This PR implements functionality to detach documents when a client is deactivated, which relates to the changes in the main PR that affect how documents are managed and synchronized in real-time.
  • Introduce cmap for distributing mutexes per documents #1051: This PR introduces a concurrent map for distributing mutexes per document, which could enhance the performance of the document synchronization process described in the main PR by allowing for more efficient handling of concurrent operations.

Suggested labels

enhancement 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go Benchmark

Benchmark suite Current: 0ef9d5a Previous: 54553fe Ratio
BenchmarkDocument/constructor_test 1491 ns/op 1337 B/op 24 allocs/op 1490 ns/op 1337 B/op 24 allocs/op 1.00
BenchmarkDocument/constructor_test - ns/op 1491 ns/op 1490 ns/op 1.00
BenchmarkDocument/constructor_test - B/op 1337 B/op 1337 B/op 1
BenchmarkDocument/constructor_test - allocs/op 24 allocs/op 24 allocs/op 1
BenchmarkDocument/status_test 958.8 ns/op 1305 B/op 22 allocs/op 958.9 ns/op 1305 B/op 22 allocs/op 1.00
BenchmarkDocument/status_test - ns/op 958.8 ns/op 958.9 ns/op 1.00
BenchmarkDocument/status_test - B/op 1305 B/op 1305 B/op 1
BenchmarkDocument/status_test - allocs/op 22 allocs/op 22 allocs/op 1
BenchmarkDocument/equals_test 8251 ns/op 7529 B/op 134 allocs/op 7783 ns/op 7529 B/op 134 allocs/op 1.06
BenchmarkDocument/equals_test - ns/op 8251 ns/op 7783 ns/op 1.06
BenchmarkDocument/equals_test - B/op 7529 B/op 7529 B/op 1
BenchmarkDocument/equals_test - allocs/op 134 allocs/op 134 allocs/op 1
BenchmarkDocument/nested_update_test 18129 ns/op 12395 B/op 264 allocs/op 17005 ns/op 12395 B/op 264 allocs/op 1.07
BenchmarkDocument/nested_update_test - ns/op 18129 ns/op 17005 ns/op 1.07
BenchmarkDocument/nested_update_test - B/op 12395 B/op 12395 B/op 1
BenchmarkDocument/nested_update_test - allocs/op 264 allocs/op 264 allocs/op 1
BenchmarkDocument/delete_test 23139 ns/op 15923 B/op 347 allocs/op 23195 ns/op 15923 B/op 347 allocs/op 1.00
BenchmarkDocument/delete_test - ns/op 23139 ns/op 23195 ns/op 1.00
BenchmarkDocument/delete_test - B/op 15923 B/op 15923 B/op 1
BenchmarkDocument/delete_test - allocs/op 347 allocs/op 347 allocs/op 1
BenchmarkDocument/object_test 8746 ns/op 7073 B/op 122 allocs/op 8738 ns/op 7073 B/op 122 allocs/op 1.00
BenchmarkDocument/object_test - ns/op 8746 ns/op 8738 ns/op 1.00
BenchmarkDocument/object_test - B/op 7073 B/op 7073 B/op 1
BenchmarkDocument/object_test - allocs/op 122 allocs/op 122 allocs/op 1
BenchmarkDocument/array_test 30207 ns/op 12203 B/op 278 allocs/op 30242 ns/op 12203 B/op 278 allocs/op 1.00
BenchmarkDocument/array_test - ns/op 30207 ns/op 30242 ns/op 1.00
BenchmarkDocument/array_test - B/op 12203 B/op 12203 B/op 1
BenchmarkDocument/array_test - allocs/op 278 allocs/op 278 allocs/op 1
BenchmarkDocument/text_test 31987 ns/op 15323 B/op 492 allocs/op 37209 ns/op 15324 B/op 492 allocs/op 0.86
BenchmarkDocument/text_test - ns/op 31987 ns/op 37209 ns/op 0.86
BenchmarkDocument/text_test - B/op 15323 B/op 15324 B/op 1.00
BenchmarkDocument/text_test - allocs/op 492 allocs/op 492 allocs/op 1
BenchmarkDocument/text_composition_test 30520 ns/op 18718 B/op 504 allocs/op 30628 ns/op 18718 B/op 504 allocs/op 1.00
BenchmarkDocument/text_composition_test - ns/op 30520 ns/op 30628 ns/op 1.00
BenchmarkDocument/text_composition_test - B/op 18718 B/op 18718 B/op 1
BenchmarkDocument/text_composition_test - allocs/op 504 allocs/op 504 allocs/op 1
BenchmarkDocument/rich_text_test 84743 ns/op 40181 B/op 1183 allocs/op 83710 ns/op 40180 B/op 1183 allocs/op 1.01
BenchmarkDocument/rich_text_test - ns/op 84743 ns/op 83710 ns/op 1.01
BenchmarkDocument/rich_text_test - B/op 40181 B/op 40180 B/op 1.00
BenchmarkDocument/rich_text_test - allocs/op 1183 allocs/op 1183 allocs/op 1
BenchmarkDocument/counter_test 18523 ns/op 11874 B/op 258 allocs/op 18707 ns/op 11874 B/op 258 allocs/op 0.99
BenchmarkDocument/counter_test - ns/op 18523 ns/op 18707 ns/op 0.99
BenchmarkDocument/counter_test - B/op 11874 B/op 11874 B/op 1
BenchmarkDocument/counter_test - allocs/op 258 allocs/op 258 allocs/op 1
BenchmarkDocument/text_edit_gc_100 1312953 ns/op 872573 B/op 17282 allocs/op 1320996 ns/op 872535 B/op 17282 allocs/op 0.99
BenchmarkDocument/text_edit_gc_100 - ns/op 1312953 ns/op 1320996 ns/op 0.99
BenchmarkDocument/text_edit_gc_100 - B/op 872573 B/op 872535 B/op 1.00
BenchmarkDocument/text_edit_gc_100 - allocs/op 17282 allocs/op 17282 allocs/op 1
BenchmarkDocument/text_edit_gc_1000 49784447 ns/op 50546893 B/op 186737 allocs/op 50330289 ns/op 50547472 B/op 186743 allocs/op 0.99
BenchmarkDocument/text_edit_gc_1000 - ns/op 49784447 ns/op 50330289 ns/op 0.99
BenchmarkDocument/text_edit_gc_1000 - B/op 50546893 B/op 50547472 B/op 1.00
BenchmarkDocument/text_edit_gc_1000 - allocs/op 186737 allocs/op 186743 allocs/op 1.00
BenchmarkDocument/text_split_gc_100 1936182 ns/op 1589045 B/op 15951 allocs/op 1936868 ns/op 1589023 B/op 15950 allocs/op 1.00
BenchmarkDocument/text_split_gc_100 - ns/op 1936182 ns/op 1936868 ns/op 1.00
BenchmarkDocument/text_split_gc_100 - B/op 1589045 B/op 1589023 B/op 1.00
BenchmarkDocument/text_split_gc_100 - allocs/op 15951 allocs/op 15950 allocs/op 1.00
BenchmarkDocument/text_split_gc_1000 115665048 ns/op 141482212 B/op 186141 allocs/op 115914520 ns/op 141482845 B/op 186153 allocs/op 1.00
BenchmarkDocument/text_split_gc_1000 - ns/op 115665048 ns/op 115914520 ns/op 1.00
BenchmarkDocument/text_split_gc_1000 - B/op 141482212 B/op 141482845 B/op 1.00
BenchmarkDocument/text_split_gc_1000 - allocs/op 186141 allocs/op 186153 allocs/op 1.00
BenchmarkDocument/text_delete_all_10000 16497904 ns/op 10213118 B/op 55686 allocs/op 16239621 ns/op 10213269 B/op 55685 allocs/op 1.02
BenchmarkDocument/text_delete_all_10000 - ns/op 16497904 ns/op 16239621 ns/op 1.02
BenchmarkDocument/text_delete_all_10000 - B/op 10213118 B/op 10213269 B/op 1.00
BenchmarkDocument/text_delete_all_10000 - allocs/op 55686 allocs/op 55685 allocs/op 1.00
BenchmarkDocument/text_delete_all_100000 307824378 ns/op 143007316 B/op 561764 allocs/op 284433695 ns/op 142961260 B/op 561653 allocs/op 1.08
BenchmarkDocument/text_delete_all_100000 - ns/op 307824378 ns/op 284433695 ns/op 1.08
BenchmarkDocument/text_delete_all_100000 - B/op 143007316 B/op 142961260 B/op 1.00
BenchmarkDocument/text_delete_all_100000 - allocs/op 561764 allocs/op 561653 allocs/op 1.00
BenchmarkDocument/text_100 227675 ns/op 120490 B/op 5182 allocs/op 219778 ns/op 120491 B/op 5182 allocs/op 1.04
BenchmarkDocument/text_100 - ns/op 227675 ns/op 219778 ns/op 1.04
BenchmarkDocument/text_100 - B/op 120490 B/op 120491 B/op 1.00
BenchmarkDocument/text_100 - allocs/op 5182 allocs/op 5182 allocs/op 1
BenchmarkDocument/text_1000 2469110 ns/op 1171281 B/op 51086 allocs/op 2407650 ns/op 1171295 B/op 51086 allocs/op 1.03
BenchmarkDocument/text_1000 - ns/op 2469110 ns/op 2407650 ns/op 1.03
BenchmarkDocument/text_1000 - B/op 1171281 B/op 1171295 B/op 1.00
BenchmarkDocument/text_1000 - allocs/op 51086 allocs/op 51086 allocs/op 1
BenchmarkDocument/array_1000 1271137 ns/op 1091618 B/op 11833 allocs/op 1209051 ns/op 1091669 B/op 11834 allocs/op 1.05
BenchmarkDocument/array_1000 - ns/op 1271137 ns/op 1209051 ns/op 1.05
BenchmarkDocument/array_1000 - B/op 1091618 B/op 1091669 B/op 1.00
BenchmarkDocument/array_1000 - allocs/op 11833 allocs/op 11834 allocs/op 1.00
BenchmarkDocument/array_10000 13506687 ns/op 9799956 B/op 120297 allocs/op 13171193 ns/op 9800320 B/op 120298 allocs/op 1.03
BenchmarkDocument/array_10000 - ns/op 13506687 ns/op 13171193 ns/op 1.03
BenchmarkDocument/array_10000 - B/op 9799956 B/op 9800320 B/op 1.00
BenchmarkDocument/array_10000 - allocs/op 120297 allocs/op 120298 allocs/op 1.00
BenchmarkDocument/array_gc_100 155738 ns/op 133266 B/op 1266 allocs/op 149367 ns/op 133267 B/op 1266 allocs/op 1.04
BenchmarkDocument/array_gc_100 - ns/op 155738 ns/op 149367 ns/op 1.04
BenchmarkDocument/array_gc_100 - B/op 133266 B/op 133267 B/op 1.00
BenchmarkDocument/array_gc_100 - allocs/op 1266 allocs/op 1266 allocs/op 1
BenchmarkDocument/array_gc_1000 1453702 ns/op 1159750 B/op 12883 allocs/op 1402528 ns/op 1159638 B/op 12882 allocs/op 1.04
BenchmarkDocument/array_gc_1000 - ns/op 1453702 ns/op 1402528 ns/op 1.04
BenchmarkDocument/array_gc_1000 - B/op 1159750 B/op 1159638 B/op 1.00
BenchmarkDocument/array_gc_1000 - allocs/op 12883 allocs/op 12882 allocs/op 1.00
BenchmarkDocument/counter_1000 211706 ns/op 193337 B/op 5773 allocs/op 204358 ns/op 193336 B/op 5773 allocs/op 1.04
BenchmarkDocument/counter_1000 - ns/op 211706 ns/op 204358 ns/op 1.04
BenchmarkDocument/counter_1000 - B/op 193337 B/op 193336 B/op 1.00
BenchmarkDocument/counter_1000 - allocs/op 5773 allocs/op 5773 allocs/op 1
BenchmarkDocument/counter_10000 2218138 ns/op 2088253 B/op 59780 allocs/op 2187941 ns/op 2088253 B/op 59780 allocs/op 1.01
BenchmarkDocument/counter_10000 - ns/op 2218138 ns/op 2187941 ns/op 1.01
BenchmarkDocument/counter_10000 - B/op 2088253 B/op 2088253 B/op 1
BenchmarkDocument/counter_10000 - allocs/op 59780 allocs/op 59780 allocs/op 1
BenchmarkDocument/object_1000 1448829 ns/op 1428219 B/op 9850 allocs/op 1382180 ns/op 1428208 B/op 9850 allocs/op 1.05
BenchmarkDocument/object_1000 - ns/op 1448829 ns/op 1382180 ns/op 1.05
BenchmarkDocument/object_1000 - B/op 1428219 B/op 1428208 B/op 1.00
BenchmarkDocument/object_1000 - allocs/op 9850 allocs/op 9850 allocs/op 1
BenchmarkDocument/object_10000 15040700 ns/op 12164173 B/op 100559 allocs/op 15516399 ns/op 12166076 B/op 100564 allocs/op 0.97
BenchmarkDocument/object_10000 - ns/op 15040700 ns/op 15516399 ns/op 0.97
BenchmarkDocument/object_10000 - B/op 12164173 B/op 12166076 B/op 1.00
BenchmarkDocument/object_10000 - allocs/op 100559 allocs/op 100564 allocs/op 1.00
BenchmarkDocument/tree_100 1072793 ns/op 943952 B/op 6103 allocs/op 1015119 ns/op 943956 B/op 6103 allocs/op 1.06
BenchmarkDocument/tree_100 - ns/op 1072793 ns/op 1015119 ns/op 1.06
BenchmarkDocument/tree_100 - B/op 943952 B/op 943956 B/op 1.00
BenchmarkDocument/tree_100 - allocs/op 6103 allocs/op 6103 allocs/op 1
BenchmarkDocument/tree_1000 78035902 ns/op 86460060 B/op 60116 allocs/op 71984632 ns/op 86460531 B/op 60116 allocs/op 1.08
BenchmarkDocument/tree_1000 - ns/op 78035902 ns/op 71984632 ns/op 1.08
BenchmarkDocument/tree_1000 - B/op 86460060 B/op 86460531 B/op 1.00
BenchmarkDocument/tree_1000 - allocs/op 60116 allocs/op 60116 allocs/op 1
BenchmarkDocument/tree_10000 9537716330 ns/op 8580652912 B/op 600224 allocs/op 9265501583 ns/op 8580668272 B/op 600211 allocs/op 1.03
BenchmarkDocument/tree_10000 - ns/op 9537716330 ns/op 9265501583 ns/op 1.03
BenchmarkDocument/tree_10000 - B/op 8580652912 B/op 8580668272 B/op 1.00
BenchmarkDocument/tree_10000 - allocs/op 600224 allocs/op 600211 allocs/op 1.00
BenchmarkDocument/tree_delete_all_1000 79766775 ns/op 87510684 B/op 75272 allocs/op 74397068 ns/op 87510854 B/op 75272 allocs/op 1.07
BenchmarkDocument/tree_delete_all_1000 - ns/op 79766775 ns/op 74397068 ns/op 1.07
BenchmarkDocument/tree_delete_all_1000 - B/op 87510684 B/op 87510854 B/op 1.00
BenchmarkDocument/tree_delete_all_1000 - allocs/op 75272 allocs/op 75272 allocs/op 1
BenchmarkDocument/tree_edit_gc_100 4009001 ns/op 4147355 B/op 15147 allocs/op 3787251 ns/op 4147288 B/op 15147 allocs/op 1.06
BenchmarkDocument/tree_edit_gc_100 - ns/op 4009001 ns/op 3787251 ns/op 1.06
BenchmarkDocument/tree_edit_gc_100 - B/op 4147355 B/op 4147288 B/op 1.00
BenchmarkDocument/tree_edit_gc_100 - allocs/op 15147 allocs/op 15147 allocs/op 1
BenchmarkDocument/tree_edit_gc_1000 318156091 ns/op 383744360 B/op 154857 allocs/op 296511603 ns/op 383747418 B/op 154859 allocs/op 1.07
BenchmarkDocument/tree_edit_gc_1000 - ns/op 318156091 ns/op 296511603 ns/op 1.07
BenchmarkDocument/tree_edit_gc_1000 - B/op 383744360 B/op 383747418 B/op 1.00
BenchmarkDocument/tree_edit_gc_1000 - allocs/op 154857 allocs/op 154859 allocs/op 1.00
BenchmarkDocument/tree_split_gc_100 2676740 ns/op 2413134 B/op 11131 allocs/op 2484493 ns/op 2413014 B/op 11131 allocs/op 1.08
BenchmarkDocument/tree_split_gc_100 - ns/op 2676740 ns/op 2484493 ns/op 1.08
BenchmarkDocument/tree_split_gc_100 - B/op 2413134 B/op 2413014 B/op 1.00
BenchmarkDocument/tree_split_gc_100 - allocs/op 11131 allocs/op 11131 allocs/op 1
BenchmarkDocument/tree_split_gc_1000 194417898 ns/op 222252361 B/op 122010 allocs/op 179575177 ns/op 222251412 B/op 121995 allocs/op 1.08
BenchmarkDocument/tree_split_gc_1000 - ns/op 194417898 ns/op 179575177 ns/op 1.08
BenchmarkDocument/tree_split_gc_1000 - B/op 222252361 B/op 222251412 B/op 1.00
BenchmarkDocument/tree_split_gc_1000 - allocs/op 122010 allocs/op 121995 allocs/op 1.00
BenchmarkRPC/client_to_server 413407248 ns/op 19761048 B/op 224877 allocs/op 422364805 ns/op 24879242 B/op 228241 allocs/op 0.98
BenchmarkRPC/client_to_server - ns/op 413407248 ns/op 422364805 ns/op 0.98
BenchmarkRPC/client_to_server - B/op 19761048 B/op 24879242 B/op 0.79
BenchmarkRPC/client_to_server - allocs/op 224877 allocs/op 228241 allocs/op 0.99
BenchmarkRPC/client_to_client_via_server 750814144 ns/op 45209716 B/op 473366 allocs/op 752579788 ns/op 44313012 B/op 468372 allocs/op 1.00
BenchmarkRPC/client_to_client_via_server - ns/op 750814144 ns/op 752579788 ns/op 1.00
BenchmarkRPC/client_to_client_via_server - B/op 45209716 B/op 44313012 B/op 1.02
BenchmarkRPC/client_to_client_via_server - allocs/op 473366 allocs/op 468372 allocs/op 1.01
BenchmarkRPC/attach_large_document 1843327503 ns/op 3020297752 B/op 13622 allocs/op 2037023699 ns/op 3037869536 B/op 15043 allocs/op 0.90
BenchmarkRPC/attach_large_document - ns/op 1843327503 ns/op 2037023699 ns/op 0.90
BenchmarkRPC/attach_large_document - B/op 3020297752 B/op 3037869536 B/op 0.99
BenchmarkRPC/attach_large_document - allocs/op 13622 allocs/op 15043 allocs/op 0.91
BenchmarkRPC/adminCli_to_server 523096431 ns/op 36401212 B/op 289714 allocs/op 529612624 ns/op 35959696 B/op 289569 allocs/op 0.99
BenchmarkRPC/adminCli_to_server - ns/op 523096431 ns/op 529612624 ns/op 0.99
BenchmarkRPC/adminCli_to_server - B/op 36401212 B/op 35959696 B/op 1.01
BenchmarkRPC/adminCli_to_server - allocs/op 289714 allocs/op 289569 allocs/op 1.00
BenchmarkLocker 65.1 ns/op 16 B/op 1 allocs/op 65.69 ns/op 16 B/op 1 allocs/op 0.99
BenchmarkLocker - ns/op 65.1 ns/op 65.69 ns/op 0.99
BenchmarkLocker - B/op 16 B/op 16 B/op 1
BenchmarkLocker - allocs/op 1 allocs/op 1 allocs/op 1
BenchmarkLockerParallel 39.12 ns/op 0 B/op 0 allocs/op 39.85 ns/op 0 B/op 0 allocs/op 0.98
BenchmarkLockerParallel - ns/op 39.12 ns/op 39.85 ns/op 0.98
BenchmarkLockerParallel - B/op 0 B/op 0 B/op 1
BenchmarkLockerParallel - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkLockerMoreKeys 148.7 ns/op 15 B/op 0 allocs/op 151.3 ns/op 15 B/op 0 allocs/op 0.98
BenchmarkLockerMoreKeys - ns/op 148.7 ns/op 151.3 ns/op 0.98
BenchmarkLockerMoreKeys - B/op 15 B/op 15 B/op 1
BenchmarkLockerMoreKeys - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkChange/Push_10_Changes 4416655 ns/op 144580 B/op 1572 allocs/op 4370215 ns/op 143561 B/op 1571 allocs/op 1.01
BenchmarkChange/Push_10_Changes - ns/op 4416655 ns/op 4370215 ns/op 1.01
BenchmarkChange/Push_10_Changes - B/op 144580 B/op 143561 B/op 1.01
BenchmarkChange/Push_10_Changes - allocs/op 1572 allocs/op 1571 allocs/op 1.00
BenchmarkChange/Push_100_Changes 16345338 ns/op 718194 B/op 8192 allocs/op 16165670 ns/op 702397 B/op 8186 allocs/op 1.01
BenchmarkChange/Push_100_Changes - ns/op 16345338 ns/op 16165670 ns/op 1.01
BenchmarkChange/Push_100_Changes - B/op 718194 B/op 702397 B/op 1.02
BenchmarkChange/Push_100_Changes - allocs/op 8192 allocs/op 8186 allocs/op 1.00
BenchmarkChange/Push_1000_Changes 132822321 ns/op 7086680 B/op 77174 allocs/op 126131500 ns/op 6745015 B/op 77155 allocs/op 1.05
BenchmarkChange/Push_1000_Changes - ns/op 132822321 ns/op 126131500 ns/op 1.05
BenchmarkChange/Push_1000_Changes - B/op 7086680 B/op 6745015 B/op 1.05
BenchmarkChange/Push_1000_Changes - allocs/op 77174 allocs/op 77155 allocs/op 1.00
BenchmarkChange/Pull_10_Changes 3611302 ns/op 123838 B/op 1403 allocs/op 3588196 ns/op 123936 B/op 1403 allocs/op 1.01
BenchmarkChange/Pull_10_Changes - ns/op 3611302 ns/op 3588196 ns/op 1.01
BenchmarkChange/Pull_10_Changes - B/op 123838 B/op 123936 B/op 1.00
BenchmarkChange/Pull_10_Changes - allocs/op 1403 allocs/op 1403 allocs/op 1
BenchmarkChange/Pull_100_Changes 5153774 ns/op 351880 B/op 4949 allocs/op 5076218 ns/op 351721 B/op 4948 allocs/op 1.02
BenchmarkChange/Pull_100_Changes - ns/op 5153774 ns/op 5076218 ns/op 1.02
BenchmarkChange/Pull_100_Changes - B/op 351880 B/op 351721 B/op 1.00
BenchmarkChange/Pull_100_Changes - allocs/op 4949 allocs/op 4948 allocs/op 1.00
BenchmarkChange/Pull_1000_Changes 10480344 ns/op 2222197 B/op 42673 allocs/op 10089825 ns/op 2220341 B/op 42666 allocs/op 1.04
BenchmarkChange/Pull_1000_Changes - ns/op 10480344 ns/op 10089825 ns/op 1.04
BenchmarkChange/Pull_1000_Changes - B/op 2222197 B/op 2220341 B/op 1.00
BenchmarkChange/Pull_1000_Changes - allocs/op 42673 allocs/op 42666 allocs/op 1.00
BenchmarkSnapshot/Push_3KB_snapshot 18615936 ns/op 852017 B/op 8194 allocs/op 18519123 ns/op 810435 B/op 8187 allocs/op 1.01
BenchmarkSnapshot/Push_3KB_snapshot - ns/op 18615936 ns/op 18519123 ns/op 1.01
BenchmarkSnapshot/Push_3KB_snapshot - B/op 852017 B/op 810435 B/op 1.05
BenchmarkSnapshot/Push_3KB_snapshot - allocs/op 8194 allocs/op 8187 allocs/op 1.00
BenchmarkSnapshot/Push_30KB_snapshot 135379703 ns/op 7655684 B/op 79347 allocs/op 131752156 ns/op 7171758 B/op 77952 allocs/op 1.03
BenchmarkSnapshot/Push_30KB_snapshot - ns/op 135379703 ns/op 131752156 ns/op 1.03
BenchmarkSnapshot/Push_30KB_snapshot - B/op 7655684 B/op 7171758 B/op 1.07
BenchmarkSnapshot/Push_30KB_snapshot - allocs/op 79347 allocs/op 77952 allocs/op 1.02
BenchmarkSnapshot/Pull_3KB_snapshot 7400337 ns/op 1141931 B/op 19420 allocs/op 7118534 ns/op 1139275 B/op 19415 allocs/op 1.04
BenchmarkSnapshot/Pull_3KB_snapshot - ns/op 7400337 ns/op 7118534 ns/op 1.04
BenchmarkSnapshot/Pull_3KB_snapshot - B/op 1141931 B/op 1139275 B/op 1.00
BenchmarkSnapshot/Pull_3KB_snapshot - allocs/op 19420 allocs/op 19415 allocs/op 1.00
BenchmarkSnapshot/Pull_30KB_snapshot 18787326 ns/op 9328603 B/op 187581 allocs/op 17695086 ns/op 9295383 B/op 187556 allocs/op 1.06
BenchmarkSnapshot/Pull_30KB_snapshot - ns/op 18787326 ns/op 17695086 ns/op 1.06
BenchmarkSnapshot/Pull_30KB_snapshot - B/op 9328603 B/op 9295383 B/op 1.00
BenchmarkSnapshot/Pull_30KB_snapshot - allocs/op 187581 allocs/op 187556 allocs/op 1.00
BenchmarkSplayTree/stress_test_100000 0.1972 ns/op 0 B/op 0 allocs/op 0.1921 ns/op 0 B/op 0 allocs/op 1.03
BenchmarkSplayTree/stress_test_100000 - ns/op 0.1972 ns/op 0.1921 ns/op 1.03
BenchmarkSplayTree/stress_test_100000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/stress_test_100000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/stress_test_200000 0.3947 ns/op 0 B/op 0 allocs/op 0.3907 ns/op 0 B/op 0 allocs/op 1.01
BenchmarkSplayTree/stress_test_200000 - ns/op 0.3947 ns/op 0.3907 ns/op 1.01
BenchmarkSplayTree/stress_test_200000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/stress_test_200000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/stress_test_300000 0.5672 ns/op 0 B/op 0 allocs/op 0.5895 ns/op 0 B/op 0 allocs/op 0.96
BenchmarkSplayTree/stress_test_300000 - ns/op 0.5672 ns/op 0.5895 ns/op 0.96
BenchmarkSplayTree/stress_test_300000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/stress_test_300000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/random_access_100000 0.01276 ns/op 0 B/op 0 allocs/op 0.01256 ns/op 0 B/op 0 allocs/op 1.02
BenchmarkSplayTree/random_access_100000 - ns/op 0.01276 ns/op 0.01256 ns/op 1.02
BenchmarkSplayTree/random_access_100000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/random_access_100000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/random_access_200000 0.03258 ns/op 0 B/op 0 allocs/op 0.0242 ns/op 0 B/op 0 allocs/op 1.35
BenchmarkSplayTree/random_access_200000 - ns/op 0.03258 ns/op 0.0242 ns/op 1.35
BenchmarkSplayTree/random_access_200000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/random_access_200000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/random_access_300000 0.04823 ns/op 0 B/op 0 allocs/op 0.03583 ns/op 0 B/op 0 allocs/op 1.35
BenchmarkSplayTree/random_access_300000 - ns/op 0.04823 ns/op 0.03583 ns/op 1.35
BenchmarkSplayTree/random_access_300000 - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/random_access_300000 - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSplayTree/editing_trace_bench 0.001927 ns/op 0 B/op 0 allocs/op 0.001771 ns/op 0 B/op 0 allocs/op 1.09
BenchmarkSplayTree/editing_trace_bench - ns/op 0.001927 ns/op 0.001771 ns/op 1.09
BenchmarkSplayTree/editing_trace_bench - B/op 0 B/op 0 B/op 1
BenchmarkSplayTree/editing_trace_bench - allocs/op 0 allocs/op 0 allocs/op 1
BenchmarkSync/memory_sync_10_test 8181 ns/op 3765 B/op 69 allocs/op 6767 ns/op 1286 B/op 38 allocs/op 1.21
BenchmarkSync/memory_sync_10_test - ns/op 8181 ns/op 6767 ns/op 1.21
BenchmarkSync/memory_sync_10_test - B/op 3765 B/op 1286 B/op 2.93
BenchmarkSync/memory_sync_10_test - allocs/op 69 allocs/op 38 allocs/op 1.82
BenchmarkSync/memory_sync_100_test 53531 ns/op 11120 B/op 304 allocs/op 51389 ns/op 8635 B/op 273 allocs/op 1.04
BenchmarkSync/memory_sync_100_test - ns/op 53531 ns/op 51389 ns/op 1.04
BenchmarkSync/memory_sync_100_test - B/op 11120 B/op 8635 B/op 1.29
BenchmarkSync/memory_sync_100_test - allocs/op 304 allocs/op 273 allocs/op 1.11
BenchmarkSync/memory_sync_1000_test 591655 ns/op 76849 B/op 2139 allocs/op 586130 ns/op 74167 B/op 2107 allocs/op 1.01
BenchmarkSync/memory_sync_1000_test - ns/op 591655 ns/op 586130 ns/op 1.01
BenchmarkSync/memory_sync_1000_test - B/op 76849 B/op 74167 B/op 1.04
BenchmarkSync/memory_sync_1000_test - allocs/op 2139 allocs/op 2107 allocs/op 1.02
BenchmarkSync/memory_sync_10000_test 7171075 ns/op 761361 B/op 20559 allocs/op 7045938 ns/op 736035 B/op 20234 allocs/op 1.02
BenchmarkSync/memory_sync_10000_test - ns/op 7171075 ns/op 7045938 ns/op 1.02
BenchmarkSync/memory_sync_10000_test - B/op 761361 B/op 736035 B/op 1.03
BenchmarkSync/memory_sync_10000_test - allocs/op 20559 allocs/op 20234 allocs/op 1.02
BenchmarkTextEditing 4937928820 ns/op 3982537648 B/op 20647486 allocs/op 4708419344 ns/op 3982595968 B/op 20647700 allocs/op 1.05
BenchmarkTextEditing - ns/op 4937928820 ns/op 4708419344 ns/op 1.05
BenchmarkTextEditing - B/op 3982537648 B/op 3982595968 B/op 1.00
BenchmarkTextEditing - allocs/op 20647486 allocs/op 20647700 allocs/op 1.00
BenchmarkTree/10000_vertices_to_protobuf 4031236 ns/op 6263038 B/op 70025 allocs/op 3442707 ns/op 6262971 B/op 70025 allocs/op 1.17
BenchmarkTree/10000_vertices_to_protobuf - ns/op 4031236 ns/op 3442707 ns/op 1.17
BenchmarkTree/10000_vertices_to_protobuf - B/op 6263038 B/op 6262971 B/op 1.00
BenchmarkTree/10000_vertices_to_protobuf - allocs/op 70025 allocs/op 70025 allocs/op 1
BenchmarkTree/10000_vertices_from_protobuf 216252944 ns/op 442172640 B/op 290046 allocs/op 153270403 ns/op 442172760 B/op 290052 allocs/op 1.41
BenchmarkTree/10000_vertices_from_protobuf - ns/op 216252944 ns/op 153270403 ns/op 1.41
BenchmarkTree/10000_vertices_from_protobuf - B/op 442172640 B/op 442172760 B/op 1.00
BenchmarkTree/10000_vertices_from_protobuf - allocs/op 290046 allocs/op 290052 allocs/op 1.00
BenchmarkTree/20000_vertices_to_protobuf 8792337 ns/op 12721743 B/op 140028 allocs/op 7783107 ns/op 12716972 B/op 140028 allocs/op 1.13
BenchmarkTree/20000_vertices_to_protobuf - ns/op 8792337 ns/op 7783107 ns/op 1.13
BenchmarkTree/20000_vertices_to_protobuf - B/op 12721743 B/op 12716972 B/op 1.00
BenchmarkTree/20000_vertices_to_protobuf - allocs/op 140028 allocs/op 140028 allocs/op 1
BenchmarkTree/20000_vertices_from_protobuf 860302022 ns/op 1697267688 B/op 580041 allocs/op 683650320 ns/op 1697268308 B/op 580046 allocs/op 1.26
BenchmarkTree/20000_vertices_from_protobuf - ns/op 860302022 ns/op 683650320 ns/op 1.26
BenchmarkTree/20000_vertices_from_protobuf - B/op 1697267688 B/op 1697268308 B/op 1.00
BenchmarkTree/20000_vertices_from_protobuf - allocs/op 580041 allocs/op 580046 allocs/op 1.00
BenchmarkTree/30000_vertices_to_protobuf 13842994 ns/op 19318364 B/op 210031 allocs/op 12119653 ns/op 19318418 B/op 210031 allocs/op 1.14
BenchmarkTree/30000_vertices_to_protobuf - ns/op 13842994 ns/op 12119653 ns/op 1.14
BenchmarkTree/30000_vertices_to_protobuf - B/op 19318364 B/op 19318418 B/op 1.00
BenchmarkTree/30000_vertices_to_protobuf - allocs/op 210031 allocs/op 210031 allocs/op 1
BenchmarkTree/30000_vertices_from_protobuf 1894569630 ns/op 3752043624 B/op 870044 allocs/op 1611340970 ns/op 3752052408 B/op 870049 allocs/op 1.18
BenchmarkTree/30000_vertices_from_protobuf - ns/op 1894569630 ns/op 1611340970 ns/op 1.18
BenchmarkTree/30000_vertices_from_protobuf - B/op 3752043624 B/op 3752052408 B/op 1.00
BenchmarkTree/30000_vertices_from_protobuf - allocs/op 870044 allocs/op 870049 allocs/op 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Copy link

codecov bot commented Oct 31, 2024

Codecov Report

Attention: Patch coverage is 73.33333% with 20 lines in your changes missing coverage. Please review.

Project coverage is 46.95%. Comparing base (4485a28) to head (0ef9d5a).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
server/backend/sync/memory/publisher.go 73.01% 14 Missing and 3 partials ⚠️
client/options.go 0.00% 2 Missing ⚠️
client/client.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1052      +/-   ##
==========================================
+ Coverage   46.76%   46.95%   +0.19%     
==========================================
  Files          82       83       +1     
  Lines       12001    12041      +40     
==========================================
+ Hits         5612     5654      +42     
+ Misses       5826     5824       -2     
  Partials      563      563              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@hackerwins hackerwins marked this pull request as ready for review November 1, 2024 09:54
@hackerwins hackerwins changed the title Introduce BatchPublisher for publishing events in batch Introduce dedicated event publisher per document Nov 1, 2024
@hackerwins hackerwins merged commit 6c15a3b into main Nov 1, 2024
5 checks passed
@hackerwins hackerwins deleted the batch-publisher branch November 1, 2024 10:26
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Outside diff range and nitpick comments (6)
test/integration/server_test.go (1)

45-45: Consider adding BatchPublisher test cases.

Since this PR introduces BatchPublisher functionality, consider adding specific test cases to verify batch publishing behavior, such as:

  • Event batching within the configured time window
  • Deduplication of events
  • Proper handling of the publishing loop

Would you like me to help draft additional test cases for BatchPublisher scenarios?

server/backend/sync/memory/publisher.go (1)

64-73: Implement event deduplication as noted in TODO

The TODO comment indicates a missing feature that's crucial for the PR's objectives of handling redundant events.

Would you like me to help implement the deduplication logic for DocumentChangedEvents? Here's a sketch of the approach:

  1. Use a map to track latest events by type
  2. Only keep the most recent event of each type
  3. Convert to slice when publishing
client/options.go (1)

97-97: Add documentation for the IsRealtime field.

Please add a documentation comment explaining the purpose and implications of the IsRealtime field. This helps users understand when to enable/disable this option.

+	// IsRealtime determines whether the document should be synchronized in real-time.
+	// When enabled, changes are automatically synchronized using batch publishing.
 	IsRealtime  bool
client/client.go (1)

Line range hint 340-344: Consider improving event handling architecture

The current implementation has several architectural concerns:

  1. Document events and watch events share the same channel, making it difficult for clients to distinguish between them.
  2. Goroutine cleanup might not be comprehensive when the stream is closed.

Consider:

  1. Separating document events and watch events into different channels.
  2. Implementing a more robust goroutine cleanup mechanism.

Here's a suggested approach for separating the events:

type WatchResponse struct {
    Type      WatchResponseType
    Presences map[string]innerpresence.Presence
+   Source    string // "document" or "watch"
    Err       error
}

And for better goroutine management:

func (c *Client) runWatchLoop(ctx context.Context, doc *document.Document) error {
+   var wg sync.WaitGroup
+   defer wg.Wait()

    // ... existing code ...

+   wg.Add(2)
    go func() {
+       defer wg.Done()
        for {
            select {
            case e := <-doc.Events():
                // ... existing code ...
            case <-ctx.Done():
                return
            }
        }
    }()

    go func() {
+       defer wg.Done()
        for {
            select {
            case r := <-doc.BroadcastRequests():
                // ... existing code ...
            case <-ctx.Done():
                return
            }
        }
    }()

    return nil
}

Also applies to: 576-577, 579-580

test/integration/document_test.go (1)

Line range hint 176-784: Consider adding edge case tests for real-time sync.

While the existing tests cover the basic functionality, consider adding tests for the following edge cases:

  1. Network disconnection during real-time sync
  2. Race conditions between multiple clients updating the same document
  3. Performance under high frequency updates
  4. Memory usage with many concurrent real-time syncs

Would you like me to help generate additional test cases for these scenarios?

server/backend/sync/memory/pubsub.go (1)

36-36: Ensure proper documentation for the publisher field

Add comments to explain the purpose of the publisher field in the Subscriptions struct for better code readability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 4485a28 and 0ef9d5a.

📒 Files selected for processing (11)
  • client/client.go (1 hunks)
  • client/options.go (2 hunks)
  • server/backend/sync/memory/publisher.go (1 hunks)
  • server/backend/sync/memory/pubsub.go (6 hunks)
  • server/backend/sync/pubsub.go (2 hunks)
  • test/bench/grpc_bench_test.go (2 hunks)
  • test/integration/admin_test.go (1 hunks)
  • test/integration/auth_webhook_test.go (1 hunks)
  • test/integration/document_test.go (10 hunks)
  • test/integration/presence_test.go (15 hunks)
  • test/integration/server_test.go (1 hunks)
🔇 Additional comments (37)
test/integration/server_test.go (1)

45-45: Verify timing sensitivity with real-time sync.

The change to real-time sync looks good, but we should verify that the test remains reliable with potential timing differences.

Let's check for similar tests that might need timing verification:

server/backend/sync/pubsub.go (2)

96-102: ⚠️ Potential issue

Enhance error handling and observability for publish timeouts.

The current implementation silently drops events on timeout, which could be problematic especially in the new batch publishing context. Consider:

  1. Adding logging when timeouts occur
  2. Exposing metrics for monitoring publish timeouts
  3. Implementing a retry mechanism or buffering strategy
 select {
 case s.Events() <- event:
     return true
 case <-gotime.After(publishTimeout):
+    log.Warnf("Timeout publishing event to subscriber %s: %v", s.subscriber, event)
+    metrics.PublishTimeouts.Inc()
     return false
 }

Let's check if there are any existing timeout handling patterns in the codebase:

#!/bin/bash
# Search for similar timeout patterns and logging
rg -A 2 "time.After|Timeout" --type go

96-102: Verify integration with BatchPublisher.

Since this is part of the new batch publishing feature, we should ensure that the timeout handling aligns with the BatchPublisher's behavior. Consider:

  1. Whether the BatchPublisher needs to be aware of these timeouts
  2. If the 100ms timeout is appropriate for batch operations
  3. How this affects the "redundant events" problem mentioned in the PR objectives

Let's examine the BatchPublisher implementation:

✅ Verification successful

Let me gather more information about the BatchPublisher implementation and its interaction with timeouts.


Let me check how the BatchPublisher handles event publishing and its relationship with the subscription timeout.


Let me check one final aspect - how the BatchPublisher processes and delivers the batched events.


Based on the gathered information, I can now provide the final response:

The timeout handling is correctly integrated with BatchPublisher

The current implementation is well-aligned with the batch publishing architecture:

  • The 100ms publish timeout in pubsub.go operates at the subscription level, protecting individual event deliveries from blocking
  • BatchPublisher handles event accumulation and batch timing independently through its window-based ticker
  • The separation of concerns is maintained: BatchPublisher focuses on batching logic while the subscription timeout handles delivery guarantees

The 100ms timeout is appropriate as it:

  • Provides sufficient time for normal event delivery
  • Doesn't block the batch processing loop
  • Helps prevent the "redundant events" problem by failing fast when subscribers are slow/unresponsive
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Look for BatchPublisher implementation
ast-grep --pattern 'type BatchPublisher struct {
  $$$
}'

Length of output: 626


Script:

#!/bin/bash
# Look for BatchPublisher methods and usage
ast-grep --pattern 'func ($_ *BatchPublisher) $$$'

# Search for timeout related configurations
rg "timeout|window" -A 3 server/backend/sync/memory/publisher.go

Length of output: 603


Script:

#!/bin/bash
# Look for publish/event handling in BatchPublisher
rg -A 10 "func.*BatchPublisher.*Publish" server/backend/sync/memory/publisher.go

# Check the subscription timeout configuration
rg "publishTimeout|timeout.*=" server/backend/sync/

Length of output: 952


Script:

#!/bin/bash
# Look for the event processing and delivery logic
rg -A 15 "processLoop|deliver" server/backend/sync/memory/publisher.go

# Check if there are any other timeout configurations in the subscription logic
rg -A 3 "func.*Subscribe|Events\(\)" server/backend/sync/

Length of output: 2138

server/backend/sync/memory/publisher.go (1)

31-38: LGTM: Thread-safe logger ID generation

The implementation correctly uses atomic operations to ensure thread-safe ID generation.

test/integration/admin_test.go (2)

89-89: LGTM: Real-time sync option aligns with the new BatchPublisher feature.

The addition of client.WithRealtimeSync() correctly implements the transition from manual to real-time synchronization as part of the BatchPublisher feature.


Line range hint 89-108: Verify if explicit sync is still necessary with real-time synchronization.

With the introduction of real-time synchronization, the explicit sync call after receiving the document change event might be redundant:

if resp.Type == client.DocumentChanged {
    err := c1.Sync(ctx, client.WithDocKey(d1.Key()))  // Is this still needed?
    assert.NoError(t, err)
    return
}

Let's verify the sync behavior in the codebase:

test/bench/grpc_bench_test.go (3)

205-205: LGTM: Real-time sync enabled for first client.

The addition of client.WithRealtimeSync() aligns with the PR's objective of implementing batch publishing for events.


215-215: LGTM: Real-time sync enabled for second client.

The addition of client.WithRealtimeSync() maintains consistency with the first client's configuration.


Line range hint 205-215: Verify benchmark coverage for batch publishing.

While the real-time sync is properly enabled, we should ensure the benchmark adequately tests the new batch publishing functionality.

Consider adding specific benchmark cases to measure:

  1. Performance under high-frequency event publishing
  2. Impact of different batch window sizes
  3. Lock contention scenarios

Would you like me to help draft additional benchmark test cases?

test/integration/auth_webhook_test.go (1)

177-177: LGTM: Test maintains authentication verification while adopting real-time sync.

The change from manual to real-time synchronization aligns with the broader PR changes introducing BatchPublisher, while preserving the test's core purpose of verifying authentication behavior for document attachment.

Let's verify that this test still effectively catches authentication failures:

✅ Verification successful

Let me verify the test's error handling around the specific Attach call by checking its context:


Change verified: Authentication test remains robust with real-time sync option

The test correctly verifies authentication failures by:

  • Asserting CodeUnauthenticated error when attempting to attach without proper authentication
  • Confirming subsequent operations fail with ErrDocumentNotAttached
  • Maintaining consistent error handling across multiple authentication scenarios in the test file
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that the test properly handles authentication failures
# by checking for appropriate error handling patterns

# Test: Look for authentication error assertions
rg -A 2 "connect\.CodeUnauthenticated.*connect\.CodeOf\(err\)" test/integration/auth_webhook_test.go

Length of output: 641


Script:

#!/bin/bash
# Get the context around the modified Attach call to verify error handling
rg -B 5 -A 5 "cli\.Attach\(ctx, doc, client\.WithRealtimeSync\(\)\)" test/integration/auth_webhook_test.go

Length of output: 421

client/client.go (1)

Line range hint 340-344: Breaking Change: Document synchronization behavior modified

The change from IsManual to IsRealtime aligns with the PR objective of implementing batch publishing. However, this is a breaking change that modifies how document synchronization is initiated.

Let's verify the impact on existing clients:

test/integration/document_test.go (8)

176-176: LGTM: Watch document changed event test properly configured with real-time sync.

The test correctly uses WithRealtimeSync() for both documents to ensure real-time propagation of changes.

Also applies to: 180-180


434-434: LGTM: Document removal test properly handles real-time sync.

The test correctly uses WithRealtimeSync() to ensure proper cleanup of watchers during document removal.


459-459: LGTM: Broadcast test properly configured with real-time sync.

The test correctly uses WithRealtimeSync() for all three documents to ensure real-time propagation of broadcast events.

Also applies to: 465-465, 471-471


521-521: LGTM: Unsubscribe test properly handles real-time sync.

The test correctly uses WithRealtimeSync() to verify that unsubscribed clients don't receive broadcasts in real-time.

Also applies to: 527-527


579-579: LGTM: Unsubscriber broadcast test properly configured.

The test correctly uses WithRealtimeSync() to verify that unsubscribed clients can still broadcast events.

Also applies to: 586-586


625-625: LGTM: Unserializable payload test properly configured.

The test correctly uses WithRealtimeSync() to verify payload validation in real-time sync mode.


648-648: LGTM: Error handling test properly configured.

The test correctly uses WithRealtimeSync() to verify error propagation during real-time sync.

Also applies to: 654-654


732-732: LGTM: Cross-project test properly configured.

The test correctly uses WithRealtimeSync() to verify document isolation between different projects in real-time sync mode.

Also applies to: 771-771, 784-784

server/backend/sync/memory/pubsub.go (3)

150-150: Ensure thread safety when closing subscriptions

Calling subs.Close() within the Delete callback might lead to concurrency issues if other goroutines are accessing subs. Verify that this operation is thread-safe.

Consider reviewing synchronization mechanisms around subs.Close().


181-181: Confirm subscribers receive events correctly after batching

With the introduction of batching, ensure that events are dispatched to subscribers as expected and that there are no delays or missed notifications.

Would you like assistance in writing tests to validate the new batching behavior?


59-60: Update all calls to the modified Publish method

Ensure that all invocations of Publish reflect the new signature without the context parameter.

Run the following script to find any outdated calls:

test/integration/presence_test.go (15)

214-223: Ensure proper cleanup and synchronization in the new test case.

The new "watch after attach events test" case follows the established pattern of creating clients, attaching documents, and cleaning up resources using defer statements. The usage of client.WithRealtimeSync() aligns with the real-time synchronization approach.


224-256: Verify the behavior of the new "watch after attach events test" case.

The test case sets up a goroutine to handle document events and waits for a specific event using wgEvents. The logic for handling events, timeouts, and unexpected stream closures is consistent with other test cases.


258-271: Confirm the expected behavior in the new test case.

The test case expects a DocumentWatched event with the presence of c2 after c1 syncs the changes related to c2's attachment. The expected slice is populated accordingly, and the received events are compared against it using assert.Equal.


274-333: Ensure proper cleanup and synchronization in the new "attach after watch events test" case.

The new test case follows the established pattern of creating clients, attaching documents, and cleaning up resources using defer statements. The usage of client.WithRealtimeSync() aligns with the real-time synchronization approach.

The test case sets up a goroutine to handle document events and waits for a specific event using wgEvents. The logic for handling events, timeouts, and unexpected stream closures is consistent with other test cases.

The expected behavior is defined by populating the expected slice with a DocumentWatched event and the presence of c2. The received events are compared against the expected events using assert.Equal.


340-343: Verify the usage of client.WithRealtimeSync() in the modified test case.

The addition of client.WithRealtimeSync() in the Attach calls enables real-time synchronization for the clients in the "unwatch after detach events test" case. This change aligns with the overall shift towards real-time synchronization mentioned in the AI-generated summary.


Line range hint 370-390: Ensure proper synchronization and event handling in the modified test case.

The modifications in the "unwatch after detach events test" case include the addition of wgEvents.Done() to signal the completion of event handling and wgEvents.Wait() to wait for the expected events before proceeding with assertions. These changes enhance the reliability of the test by ensuring that the expected events are captured before making assertions.


406-407: Verify the synchronization and event handling in the modified test case.

The addition of wgEvents.Wait() ensures that the test waits for the expected events to be captured before proceeding with the assertion. This change enhances the reliability of the test by preventing premature assertions.


429-432: Confirm the usage of client.WithRealtimeSync() in the modified test case.

The addition of client.WithRealtimeSync() in the Attach calls enables real-time synchronization for the clients in the "detach after unwatch events test" case. This change aligns with the overall shift towards real-time synchronization mentioned in the AI-generated summary.


459-460: Verify the synchronization and event handling in the modified test case.

The addition of wgEvents.Done() signals the completion of event handling, allowing the test to proceed with assertions. This change enhances the reliability of the test by ensuring that the expected events are captured before making assertions.


478-479: Confirm the synchronization and event handling in the modified test case.

The addition of wgEvents.Wait() ensures that the test waits for the expected events to be captured before proceeding with the next step. This change enhances the reliability of the test by preventing premature execution of subsequent code.


495-496: Verify the synchronization and event handling in the modified test case.

The addition of wgEvents.Wait() ensures that the test waits for the expected events to be captured before proceeding with the next step. This change enhances the reliability of the test by preventing premature execution of subsequent code.


514-578: Ensure proper cleanup, synchronization, and event handling in the new "watch after update events test" case.

The new test case follows the established pattern of creating clients, attaching documents, and cleaning up resources using defer statements. The usage of client.WithRealtimeSync() aligns with the real-time synchronization approach.

The test case sets up a goroutine to handle document events and waits for a specific event using wgEvents. The logic for handling events, timeouts, and unexpected stream closures is consistent with other test cases.

The expected behavior is defined by populating the expected slice with a DocumentWatched event and the updated presence of c2. The received events are compared against the expected events using assert.Equal.


580-636: Verify the behavior and synchronization in the new "watch after detach events test" case.

The new test case follows the established pattern of creating clients, attaching documents, and cleaning up resources using defer statements. The usage of client.WithRealtimeSync() aligns with the real-time synchronization approach.

The test case sets up a goroutine to handle document events and uses wgEvents to control the test flow. The logic for handling events, timeouts, and unexpected events is consistent with other test cases.

The test case verifies that no presence events are received after c2 detaches from the document and c1 syncs the changes. The assertion assert.Equal(t, 0, len(responsePairs)) ensures that no unexpected events are captured.


Line range hint 646-693: Confirm the behavior and synchronization in the modified "watching multiple documents test" case.

The modifications in the test case include the addition of client.WithRealtimeSync() in the Attach calls, aligning with the real-time synchronization approach.

The test case sets up a goroutine to handle document events and uses wgEvents to control the test flow. The logic for handling events, timeouts, and unexpected stream closures is consistent with other test cases.

The expected behavior is defined by populating the expected slice with DocumentWatched and DocumentUnwatched events and the corresponding presences. The received events are compared against the expected events using assert.Equal.


131-133: Verify the usage of client.WithRealtimeSync() in the codebase.

The addition of client.WithRealtimeSync() in the Attach calls enables real-time synchronization for the clients. This change aligns with the introduction of the BatchPublisher and the shift towards real-time synchronization mentioned in the AI-generated summary.

To ensure that this option is used consistently across the codebase, run the following script:

✅ Verification successful

Let me search for the WithRealtimeSync option to understand its usage pattern.


Let me check the definition and implementation of WithRealtimeSync to understand its purpose.


Based on the findings, I can now generate the final response:

WithRealtimeSync() is correctly used in presence-related tests

The WithRealtimeSync() option is specifically designed for enabling real-time synchronization by setting IsRealtime = true in the attach options. Its usage in presence tests is appropriate as presence functionality requires real-time updates. The option is also consistently used in other test files where real-time synchronization is needed, such as document tests and server tests.

The absence of this option in other test files is intentional, as not all test scenarios require real-time synchronization. This follows the principle of explicit opt-in for real-time features.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of `client.WithRealtimeSync()` in `Attach` calls.

# Test: Search for `Attach` calls without `client.WithRealtimeSync()`. 
# Expect: No occurrences found (empty output).
rg --type go --glob '!test/integration/presence_test.go' $'Attach\(ctx, [^,]+\)' 

Length of output: 19648


Script:

#!/bin/bash
# Search for the definition and usage of WithRealtimeSync
rg --type go "WithRealtimeSync"

Length of output: 3924


Script:

#!/bin/bash
# Get the implementation of WithRealtimeSync and its usage in the client package
ast-grep --pattern 'func WithRealtimeSync() AttachOption {
  $$$
}'

Length of output: 231

Comment on lines +29 to +32
const (
// publishTimeout is the timeout for publishing an event.
publishTimeout = 100 * gotime.Millisecond
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making publishTimeout configurable.

While extracting the timeout to a constant improves maintainability, consider making it configurable through environment variables or configuration settings. This would allow tuning the timeout for different deployment scenarios without code changes.

 const (
-	// publishTimeout is the timeout for publishing an event.
-	publishTimeout = 100 * gotime.Millisecond
+	// defaultPublishTimeout is the default timeout for publishing an event.
+	defaultPublishTimeout = 100 * gotime.Millisecond
 )
+
+// PublishTimeout returns the configured timeout for publishing events
+func PublishTimeout() time.Duration {
+	if val := os.Getenv("YORKIE_PUBLISH_TIMEOUT"); val != "" {
+		if duration, err := time.ParseDuration(val); err == nil {
+			return duration
+		}
+	}
+	return defaultPublishTimeout
+}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +40 to +49
// BatchPublisher is a publisher that publishes events in batch.
type BatchPublisher struct {
logger *zap.SugaredLogger
mutex gosync.Mutex
events []sync.DocEvent

window time.Duration
closeChan chan struct{}
subs *Subscriptions
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding capacity hints to the events slice

The events slice will be frequently appended to and cleared. Consider initializing it with a capacity hint based on expected batch sizes to reduce allocations.

 type BatchPublisher struct {
 	logger *zap.SugaredLogger
 	mutex  gosync.Mutex
-	events []sync.DocEvent
+	events []sync.DocEvent // TODO: Add field comment about typical capacity

 	window    time.Duration
 	closeChan chan struct{}
 	subs      *Subscriptions
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +128 to +131
// Close stops the batch publisher
func (bp *BatchPublisher) Close() {
close(bp.closeChan)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance Close method to ensure graceful shutdown

The current implementation might lose events that haven't been published yet. Consider implementing a more graceful shutdown:

+// Close stops the batch publisher and ensures all pending events are published
 func (bp *BatchPublisher) Close() {
+	bp.mutex.Lock()
+	if len(bp.events) > 0 {
+		// Publish remaining events
+		bp.publish()
+	}
+	bp.mutex.Unlock()
+
 	close(bp.closeChan)
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +89 to +101
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()

if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}

events := bp.events
bp.events = nil

bp.mutex.Unlock()

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider optimizing lock handling

The lock is held while checking for empty events. Consider restructuring to minimize lock duration:

 func (bp *BatchPublisher) publish() {
 	bp.mutex.Lock()
+	events := bp.events
+	bp.events = nil
+	bp.mutex.Unlock()
 
-	if len(bp.events) == 0 {
-		bp.mutex.Unlock()
+	if len(events) == 0 {
 		return
 	}
-
-	events := bp.events
-	bp.events = nil
-
-	bp.mutex.Unlock()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}
events := bp.events
bp.events = nil
bp.mutex.Unlock()
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
events := bp.events
bp.events = nil
bp.mutex.Unlock()
if len(events) == 0 {
return
}

Comment on lines +89 to +108
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()

if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}

events := bp.events
bp.events = nil

bp.mutex.Unlock()

if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(bp.events),
bp.subs.docKey,
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect logging of batch size

The logging statement uses bp.events after it has been set to nil. It should use events instead.

 	if logging.Enabled(zap.DebugLevel) {
 		bp.logger.Infof(
 			"Publishing batch of %d events for document %s",
-			len(bp.events),
+			len(events),
 			bp.subs.docKey,
 		)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}
events := bp.events
bp.events = nil
bp.mutex.Unlock()
if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(bp.events),
bp.subs.docKey,
)
}
func (bp *BatchPublisher) publish() {
bp.mutex.Lock()
if len(bp.events) == 0 {
bp.mutex.Unlock()
return
}
events := bp.events
bp.events = nil
bp.mutex.Unlock()
if logging.Enabled(zap.DebugLevel) {
bp.logger.Infof(
"Publishing batch of %d events for document %s",
len(events),
bp.subs.docKey,
)
}

Comment on lines +114 to +116
// WithRealtimeSync configures the manual sync of the client.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect documentation comment.

The documentation comment mentions "manual sync" but the function enables real-time sync. Please update it to accurately reflect the function's purpose.

-// WithRealtimeSync configures the manual sync of the client.
+// WithRealtimeSync enables real-time synchronization for the document.
+// When enabled, changes are automatically synchronized using batch publishing
+// instead of requiring manual synchronization.
 func WithRealtimeSync() AttachOption {
 	return func(o *AttachOptions) { o.IsRealtime = true }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// WithRealtimeSync configures the manual sync of the client.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }
// WithRealtimeSync enables real-time synchronization for the document.
// When enabled, changes are automatically synchronized using batch publishing
// instead of requiring manual synchronization.
func WithRealtimeSync() AttachOption {
return func(o *AttachOptions) { o.IsRealtime = true }

@@ -94,7 +94,7 @@
// Presence is the presence of the client.
Presence innerpresence.Presence
InitialRoot map[string]any
IsManual bool
IsRealtime bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Update comment in WithRealtimeSync function

The migration from manual to realtime sync is complete, but there's a minor documentation issue. The comment for WithRealtimeSync incorrectly mentions "manual sync" instead of "realtime sync". The reference in install-crds.yaml is unrelated as it's for ArgoCD configuration.

  • client/options.go: Update the comment // WithRealtimeSync configures the manual sync of the client. to // WithRealtimeSync configures the realtime sync of the client.
🔗 Analysis chain

Verify removal of manual sync references.

The change from manual to real-time synchronization is a significant change. Let's verify that all references to manual synchronization have been properly updated.


Let's check the content of the files that still contain manual sync references to understand the context.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining references to manual sync that might need updating

# Test 1: Search for IsManual references
echo "Searching for IsManual references..."
rg "IsManual" -l

# Test 2: Search for WithManualSync references
echo "Searching for WithManualSync references..."
rg "WithManualSync" -l

# Test 3: Search for "manual sync" or "manual synchronization" in comments
echo "Searching for manual sync references in comments..."
rg -i "manual.?sync" -l

Length of output: 424


Script:

#!/bin/bash
# Check content of files with manual sync references
echo "Content of client/options.go with manual sync references:"
rg -i "manual.?sync" -C 3 client/options.go

echo -e "\nContent of build/charts/yorkie-argocd/crds/install-crds.yaml with manual sync references:"
rg -i "manual.?sync" -C 3 build/charts/yorkie-argocd/crds/install-crds.yaml

Length of output: 981

docKey: docKey,
internalMap: cmap.New[string, *sync.Subscription](),
}
s.publisher = NewBatchPublisher(s, 100*gotime.Millisecond)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make batch interval configurable

The batch interval 100*gotime.Millisecond is hardcoded. Consider making this value configurable to allow flexibility for different use cases and environments.

For example, you might introduce a configuration parameter or pass it as an argument to NewBatchPublisher.

@@ -18,6 +18,7 @@

import (
"context"
gotime "time"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use standard alias for the time package

Consider using the standard alias time instead of gotime for clarity and consistency across the codebase.

Apply this diff to update the import:

-import gotime "time"
+import "time"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
gotime "time"
"time"

Comment on lines +79 to +81
func (s *Subscriptions) Close() {
s.publisher.Close()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors during publisher closure

The Close method calls s.publisher.Close() but does not handle potential errors. Consider checking for errors to ensure resources are released properly.

Apply this diff to handle the error:

 func (s *Subscriptions) Close() {
-	s.publisher.Close()
+	if err := s.publisher.Close(); err != nil {
+		// Handle the error appropriately, e.g., log it
+		logging.DefaultLogger().Error(err)
+	}
 }

Committable suggestion skipped: line range outside the PR's diff.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants